Linux系统编程 | 您所在的位置:网站首页 › linux 消息队列 系统参数 › Linux系统编程 |
消息队列本质上是位于内核空间的链表,链表的每个节点都是一条消息。每一条消息都有自己的消息类型,消息类型用整数来表示,而且必须大于 0。每种类型的消息都被对应的链表所维护:
其中数字 1 表示类型为 1 的消息,数字2、3、4 类似。彩色块表示消息数据,它们被挂在对应类型的链表上。
值得注意的是,刚刚说过没有消息类型为 0 的消息,实际上,消息类型为 0 的链表记录了所有消息加入队列的顺序,其中红色箭头表示消息加入的顺序。
无论你是发送还是接收消息,消息的格式都必须按照规范来。简单的说,它一般长成下面这个样子:
所以,只要你保证首4字节(32 位 linux 下的 long)是一个整数就行了。 举个例子:
从上面可以看出,正文部分是什么数据类型都没关系,因为消息队列传递的是 2 进制数据,不一定非得是文本。
msgsnd 函数用于将数据发送到消息队列。如果该函数被信号打断,会设置 errno 为 EINTR。
参数 msqid:ipc 内核对象 id 参数 msgp:消息数据地址 参数 msgsz:消息正文部分的大小(不包含消息类型) 参数 msgflg:可选项 该值为 0:如果消息队列空间不够,msgsnd 会阻塞。 IPC_NOWAIT:直接返回,如果空间不够,会设置 errno 为 EAGIN.
返回值:0 表示成功,-1 失败并设置 errno。
msgrcv 函数从消息队列取出消息后,并将其从消息队列里删除。
参数 msqid:ipc 内核对象 id 参数 msgp:用来接收消息数据地址 参数 msgsz:消息正文部分的大小(不包含消息类型) 参数 msgtyp:指定获取哪种类型的消息
msgtyp = 0:获取消息队列中的第一条消息 msgtyp 0:获取类型为 msgtyp 的第一条消息,除非指定了 msgflg 为MSG_EXCEPT,这表示获取除了 msgtyp 类型以外的第一条消息。 msgtyp 0:获取类型 ≤|msgtyp|≤|msgtyp| 的第一条消息。 参数 msgflg:可选项。 如果为 0 表示没有消息就阻塞。 IPC_NOWAIT:如果指定类型的消息不存在就立即返回,同时设置 errno 为 ENOMSG MSG_EXCEPT:仅用于 msgtyp 0 的情况。表示获取类型不为 msgtyp 的消息 MSG_NOERROR:如果消息数据正文内容大于 msgsz,就将消息数据截断为 msgsz
程序 msg_send 和 msg_recv 分别用于向消息队列发送数据和接收数据。
msg_send 程序定义了一个结构体 Msg,消息正文部分是结构体 Person。该程序向消息队列发送了 10 条消息。 msg_send.c
程序 msg_send 第一次运行完后,内核中的消息队列大概像下面这样:
msg_recv 程序接收一个参数,表示接收哪种类型的消息。比如./msg_recv 4 表示接收类型为 4 的消息,并打印在屏幕。
先运行 msg_send,再运行 msg_recv。 接收所有消息
接收类型为 4 的消息
获取和设置消息队列的属性
msqid:消息队列标识符 cmd:控制指令 IPC_STAT:获得msgid的消息队列头数据到buf中 IPC_SET:设置消息队列的属性,要设置的属性需先存储在buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes buf:消息队列管理结构体。
返回值: 成功:0 出错:-1,错误原因存于error中 EACCESS:参数cmd为IPC_STAT,确无权限读取该消息队列 EFAULT:参数buf指向无效的内存地址 EIDRM:标识符为msqid的消息队列已被删除 EINVAL:无效的参数cmd或msqid EPERM:参数cmd为IPC_SET或IPC_RMID,却无足够的权限执行 把if((msgid=msgget(IPC_PRIVATE,0666))==-1) { printf("error111") exit(0) } 放到fork()函数之前就可以了。 创建消息队列需要在fork()之前,因为fork()产生的是两个进程,他们的资源是相互独立的。 fork()之后创建的消息队列,另一个进程不能识别。 linux中的进程通信分为三个部分:低级通信,管道通信和进程间通信IPC(inter process communication)。linux的低级通信主要用来传递进程的控制信号——文件锁和软中断信号机制。linux的进程间通信IPC有三个部分——①信号量,②共享内存和③消息队列。以下是我编写的linux进程通信的C语言实现代码。操作系统为redhat9.0,编辑器为vi,编译器采用gcc。下面所有实现代码均已经通过测试,运行无误。一.低级通信--信号通信 signal.c #include #include #include /*捕捉到信号sig之后,执行预先预定的动作函数*/ void sig_alarm(int sig) { printf("---the signal received is %d. /n", sig) signal(SIGINT, SIG_DFL)//SIGINT终端中断信号,SIG_DFL:恢复默认行为,SIN_IGN:忽略信号 } int main() { signal(SIGINT, sig_alarm)//捕捉终端中断信号 while(1) { printf("waiting here!/n") sleep(1) } return 0 } 二.管道通信 pipe.c #include #define BUFFER_SIZE 30 int main() { int x int fd[2] char buf[BUFFER_SIZE] char s[BUFFER_SIZE] pipe(fd)//创建管道 while((x=fork())==-1)//创建管道失败时,进入循环 /*进入子进程,子进程向管道中写入一个字符串*/ if(x==0) { sprintf(buf,"This is an example of pipe!/n") write(fd[1],buf,BUFFER_SIZE) exit(0) } /*进入父进程,父进程从管道的另一端读出刚才写入的字符串*/ else { wait(0)//等待子进程结束 read(fd[0],s,BUFFER_SIZE)//读出字符串,并将其储存在char s[]中 printf("%s",s)//打印字符串 } return 0 } 三.进程间通信——IPC ①信号量通信 sem.c #include #include #include #include types.h #include ipc.h #include sem.h /*联合体变量*/ union semun { int val//信号量初始值 struct semid_ds *buf unsigned short int *array struct seminfo *__buf } /*函数声明,信号量定义*/ static int set_semvalue(void)//设置信号量 static void del_semvalue(void)//删除信号量 static int semaphore_p(void)//执行P操作 static int semaphore_v(void)//执行V操作 static int sem_id//信号量标识符 int main(int argc, char *argv[]) { int i int pause_time char op_char = 'O' srand((unsigned int)getpid()) sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT)//创建一个信号量,IPC_CREAT表示创建一个新的信号量 /*如果有参数,设置信号量,修改字符*/ if (argc 1) { if (!set_semvalue()) { fprintf(stderr, "Failed to initialize semaphore/n") exit(EXIT_FAILURE) } op_char = 'X' sleep(5) } for(i = 0i 10i++) { /*执行P操作*/ if (!semaphore_p()) exit(EXIT_FAILURE) printf("%c", op_char) fflush(stdout) pause_time = rand() % 3 sleep(pause_time) printf("%c", op_char) fflush(stdout) /*执行V操作*/ if (!semaphore_v()) exit(EXIT_FAILURE) pause_time = rand() % 2 sleep(pause_time) } printf("/n%d - finished/n", getpid()) if (argc 1) { sleep(10) del_semvalue()//删除信号量 } exit(EXIT_SUCCESS) } /*设置信号量*/ static int set_semvalue(void) { union semun sem_union sem_union.val = 1 if (semctl(sem_id, 0, SETVAL, sem_union) == -1) return(0) return(1) } /*删除信号量*/ static void del_semvalue(void) { union semun sem_union if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1) fprintf(stderr, "Failed to delete semaphore/n") } /*执行P操作*/ static int semaphore_p(void) { struct sembuf sem_b sem_b.sem_num = 0 sem_b.sem_op = -1/* P() */ sem_b.sem_flg = SEM_UNDO if (semop(sem_id, sem_b, 1) == -1) { fprintf(stderr, "semaphore_p failed/n") return(0) } return(1) } /*执行V操作*/ static int semaphore_v(void) { struct sembuf sem_b sem_b.sem_num = 0 sem_b.sem_op = 1/* V() */ sem_b.sem_flg = SEM_UNDO if (semop(sem_id, sem_b, 1) == -1) { fprintf(stderr, "semaphore_v failed/n") return(0) } return(1) } ②消息队列通信 send.c #include #include #include #include #include #include types.h #include ipc.h #include msg.h #define MAX_TEXT 512 /*用于消息收发的结构体--my_msg_type:消息类型,some_text:消息正文*/ struct my_msg_st { long int my_msg_type char some_text[MAX_TEXT] } int main() { int running = 1//程序运行标识符 struct my_msg_st some_data int msgid//消息队列标识符 char buffer[BUFSIZ] /*创建与接受者相同的消息队列*/ msgid = msgget((key_t)1234, 0666 | IPC_CREAT) if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d/n", errno) exit(EXIT_FAILURE) } /*向消息队列中发送消息*/ while(running) { printf("Enter some text: ") fgets(buffer, BUFSIZ, stdin) some_data.my_msg_type = 1 strcpy(some_data.some_text, buffer) if (msgsnd(msgid, (void *)some_data, MAX_TEXT, 0) == -1) { fprintf(stderr, "msgsnd failed/n") exit(EXIT_FAILURE) } if (strncmp(buffer, "end", 3) == 0) { running = 0 } } exit(EXIT_SUCCESS) } receive.c #include #include #include #include #include #include types.h #include ipc.h #include msg.h /*用于消息收发的结构体--my_msg_type:消息类型,some_text:消息正文*/ struct my_msg_st { long int my_msg_type char some_text[BUFSIZ] } int main() { int running = 1//程序运行标识符 int msgid//消息队列标识符 struct my_msg_st some_data long int msg_to_receive = 0//接收消息的类型--0表示msgid队列上的第一个消息 /*创建消息队列*/ msgid = msgget((key_t)1234, 0666 | IPC_CREAT) if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d/n", errno) exit(EXIT_FAILURE) } /*接收消息*/ while(running) { if (msgrcv(msgid, (void *)some_data, BUFSIZ,msg_to_receive, 0) == -1) { fprintf(stderr, "msgrcv failed with error: %d/n", errno) exit(EXIT_FAILURE) } printf("You wrote: %s", some_data.some_text) if (strncmp(some_data.some_text, "end", 3) == 0) { running = 0 } } /*删除消息队列*/ if (msgctl(msgid, IPC_RMID, 0) == -1) { fprintf(stderr, "msgctl(IPC_RMID) failed/n") exit(EXIT_FAILURE) } exit(EXIT_SUCCESS) } ③共享内存通信 share.h #define TEXT_SZ 2048 //申请共享内存大小 struct shared_use_st { int written_by_you//written_by_you为1时表示有数据写入,为0时表示数据已经被消费者提走 char some_text[TEXT_SZ] } producer.c #include #include #include #include #include types.h #include ipc.h #include shm.h #include "share.h" int main() { int running = 1//程序运行标志位 void *shared_memory = (void *)0 struct shared_use_st *shared_stuff char buffer[BUFSIZ] int shmid//共享内存标识符 /*创建共享内存*/ shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT) if (shmid == -1) { fprintf(stderr, "shmget failed/n") exit(EXIT_FAILURE) } /*将共享内存连接到一个进程的地址空间中*/ shared_memory = shmat(shmid, (void *)0, 0)//指向共享内存第一个字节的指针 if (shared_memory == (void *)-1) { fprintf(stderr, "shmat failed/n") exit(EXIT_FAILURE) } printf("Memory attached at %X/n", (int)shared_memory) shared_stuff = (struct shared_use_st *)shared_memory /*生产者写入数据*/ while(running) { while(shared_stuff-written_by_you == 1) { sleep(1) printf("waiting for client.../n") } printf("Enter some text: ") fgets(buffer, BUFSIZ, stdin) strncpy(shared_stuff-some_text, buffer, TEXT_SZ) shared_stuff-written_by_you = 1 if (strncmp(buffer, "end", 3) == 0) { running = 0 } } /*该函数用来将共享内存从当前进程中分离,仅使得当前进程不再能使用该共享内存*/ if (shmdt(shared_memory) == -1) { fprintf(stderr, "shmdt failed/n") exit(EXIT_FAILURE) } printf("producer exit./n") exit(EXIT_SUCCESS) } customer.c #include #include #include #include #include types.h #include ipc.h #include shm.h #include "share.h" int main() { int running = 1//程序运行标志位 void *shared_memory = (void *)0 struct shared_use_st *shared_stuff int shmid//共享内存标识符 srand((unsigned int)getpid()) /*创建共享内存*/ shmid = shmget((key_t)1234, sizeof(struct shared_use_st), 0666 | IPC_CREAT) if (shmid == -1) { fprintf(stderr, "shmget failed/n") exit(EXIT_FAILURE) } /*将共享内存连接到一个进程的地址空间中*/ shared_memory = shmat(shmid, (void *)0, 0)//指向共享内存第一个字节的指针 if (shared_memory == (void *)-1) { fprintf(stderr, "shmat failed/n") exit(EXIT_FAILURE) } printf("Memory attached at %X/n", (int)shared_memory) shared_stuff = (struct shared_use_st *)shared_memory shared_stuff-written_by_you = 0 /*消费者读取数据*/ while(running) { if (shared_stuff-written_by_you) { printf("You wrote: %s", shared_stuff-some_text) sleep( rand() % 4 ) shared_stuff-written_by_you = 0 if (strncmp(shared_stuff-some_text, "end", 3) == 0) { running = 0 } } } /*该函数用来将共享内存从当前进程中分离,仅使得当前进程不再能使用该共享内存*/ if (shmdt(shared_memory) == -1) { fprintf(stderr, "shmdt failed/n") exit(EXIT_FAILURE) } /*将共享内存删除,所有进程均不能再访问该共享内存*/ if (shmctl(shmid, IPC_RMID, 0) == -1) { fprintf(stderr, "shmctl(IPC_RMID) failed/n") exit(EXIT_FAILURE) } exit(EXIT_SUCCESS) } 摘自: 欢迎分享,转载请注明来源:内存溢出 原文地址:https://outofmemory.cn/yw/8567807.html |
CopyRight 2018-2019 实验室设备网 版权所有 |